home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / src / mppchunk.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  23KB  |  939 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: mppchunk.c,v 1.6 1997/06/25 22:09:12 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /* File: mppchunk.c -- implement asynchronous, pre-posted receive 
  34. *    buffers for mpp machines.  
  35. *
  36. * NOTE: This code is designed to handle pvm packets (struct pkt) and
  37. * pvm frags (struct frag). The packet and frag structures are very similar.
  38. * So similar, in fact, that a small number of macros can be used to define
  39. * fwd,rev link ptrs, allocation routines, etc. In this code, these are
  40. * generically called chunks.  Daemons use packets while user code use
  41. * fragments. 
  42. *
  43. *
  44. * This code implements virtual sequence numbers to maintain ordered
  45. *    delivery of message packets to the rest of the messaging system.
  46. *
  47. *    for each connected task (O(1000) for PVM daemons) a packet sequence number
  48. *    is maintained. Tasks send their packets in increasing
  49. *    order to the pre-alloced buffers (with wrap when the buffer depth is
  50. *    depleted).  The underlying messaging insures that packets sent in 
  51. *    the same task on the same message tag will arrive at the receiver
  52. *    in order.  However, difficulty arises when multiple tasks are 
  53. *    sharing the buffers. A receive must be processed and then information 
  54. *    about who sent the message is extracted. In this scenario, it is
  55. *    likely that packets will be received out of order. An example follows
  56. *    for a buffer depth of four and two tasks A, B. The sequence numbers indicate
  57. *    the order in which the packets are sent. A has sent 4 packets, B has sent
  58. *    7 packets 
  59. *
  60. *    BUF1:     A(0)  B(0) B(4)   
  61. *    BUF2:    B(1)  A(1) B(5)
  62. *    BUF3:   A(2)  B(2) B(6)
  63. *    BUF4:    A(3)  B(3)
  64. *
  65. *    If round robin message probing is used, then packets are received in the
  66. *    following order:
  67. *
  68. *    A(0) B(1) A(2) A(3) B(0) A(1) B(2) B(3) B(4) B(5) B(6) 
  69. *
  70. *    In this case B(0) is received before B(1), A(2) and A(3) are both
  71. *    received before A(1).  
  72. *
  73. *    By keeping track of what the current sequence number is for each task,
  74. *    message delivery order can be maintained.
  75. */
  76.  
  77.  
  78. /* NEEDED Support Routines:
  79.  * 
  80.  * struct msgid * pvm_mpp_get_precvids()
  81.  *
  82.  *         gets a pointer to head of list for post precv(s).
  83.  *        for daemons, this can return (struct msgid) NULL 
  84.  *
  85.  * MPP_DIRECTI_PTR pvm_find_direct(ostructs, nstructs, src)))
  86.  *         MPP_DIRECTI_PTR ostructs;
  87.  *        int nstructs;
  88.  *        int src;
  89.  *
  90.  *        ostructs - the start of an array of ordering structs. 
  91.  *        nstructs - length of ostructs array
  92.  *        src -    tid or node # that we are trying to find the ordering struct
  93. */
  94.  
  95.  
  96.  
  97. #include <stdio.h>
  98. #include <stdlib.h>
  99. #include <sys/types.h>
  100. #include <sys/time.h>
  101. #include <ctype.h>
  102. #include <pvm3.h>
  103. #include <pvmproto.h>
  104. #include "global.h"
  105. #include "pvmalloc.h"
  106. #include "listmac.h"
  107. #include "lmsg.h"
  108. #include "mppmsg.h"
  109. #include "mppchunk.h"
  110. #include "pvmmimd.h"
  111. #include "bfunc.h"
  112.  
  113. /* --- external declarations */
  114. extern int errno;
  115. extern int pvmdebmask;           /* from pvmd.c */
  116.  
  117. /* Global Variables that can be Changed */
  118. int MAXSEQ = DFN_MAXSEQ;
  119.  
  120. /* Private variables */
  121. static char mpperrtext[128];
  122.  
  123. /* Prototypes for externals */
  124. MPP_DIRECTI_PTR pvm_find_direct __ProtoGlarp__((MPP_DIRECTI_PTR, int , int));
  125. struct msgid * pvm_mpp_get_precvids __ProtoGlarp__( () ); 
  126.  
  127. /* ======== new_directstruct ======== */
  128. /* Allocate and fill and a new MPP_DIRECTI struct */
  129.  
  130. MPP_DIRECTI_PTR 
  131. new_directstruct( nrbufs, nsbufs )
  132. int nrbufs;  /* number of receive buffers */
  133. int nsbufs;     /* number of send buffers */
  134. {
  135.     MPP_DIRECTI_PTR new;
  136.     int i;
  137.     
  138.     if ( !( new = (MPP_DIRECTI_PTR) TALLOC(1, MPP_DIRECTI, "new_directstruct") )
  139.             || ! (new -> ordering = new_chunkostruct( nrbufs )))
  140.     {
  141.         pvmlogerror("new_directstruct: can't get memory\n");
  142.         pvmbailout(PvmNoMem);    
  143.         return  (MPP_DIRECTI_PTR) NULL;
  144.     }
  145.  
  146.     init_directstruct( new,  nsbufs);
  147.     
  148.     return new;
  149.  
  150. }
  151.  
  152. /* ======== init_directstruct ======== */
  153. int
  154. init_directstruct( dstruct , nsbufs )
  155. MPP_DIRECTI_PTR dstruct;
  156. int nsbufs;
  157. {
  158.     /* Initialize the variables. Set unknown values to -1 */
  159.     dstruct -> rtid = -1;
  160.     dstruct -> rpid = -1;
  161.     dstruct -> tagbase = -1;
  162.     dstruct -> nbufs = nsbufs;
  163.     dstruct -> sseq = -1;
  164.  
  165.     return PvmOk;
  166. }
  167.     
  168.  
  169. /* ======== fill_directstruct ======== */
  170. int
  171. fill_directstruct( dstruct , nsbufs, rtid, rpid, tagbase, seq , appid)
  172. MPP_DIRECTI_PTR dstruct;
  173. int nsbufs;
  174. int rtid;
  175. int rpid;
  176. int tagbase;
  177. int seq;
  178. int appid;
  179. {
  180.     /* Initialize the variables. Set unknown values to -1 */
  181.  
  182.     dstruct -> rtid = rtid;
  183.     dstruct -> rpid = rpid;
  184.     dstruct -> tagbase = tagbase;
  185.     dstruct -> nbufs = nsbufs;
  186.     dstruct -> sseq = seq;
  187.     dstruct -> appid = appid;
  188.  
  189.     return PvmOk;
  190. }
  191.     
  192. /* ======== new_vdirectstruct ======== */
  193. /* Allocate and fill n new MPP_DIRECTI structs, return an
  194.     array of MPP_DIRECTI structs */
  195.  
  196. MPP_DIRECTI_PTR 
  197. new_vdirectstruct( nstructs, nsbufs, nrbufs )
  198. int nsbufs;
  199. int nrbufs;
  200. {
  201.     MPP_DIRECTI_PTR vnew, current;
  202.     CHUNK_ORDER_PTR ostructs;
  203.     int i,j;
  204.     
  205.     if (  !( vnew = (MPP_DIRECTI_PTR) TALLOC(nstructs, MPP_DIRECTI, "new_vdirectstruct") ) 
  206.              || !( ostructs = new_vchunkostruct( nstructs, nrbufs ) ) )
  207.     {
  208.         pvmlogerror("new_vdirectstruct: can't get memory\n");
  209.         pvmbailout(PvmNoMem);    
  210.         return  (MPP_DIRECTI_PTR) NULL;
  211.     }
  212.  
  213.     /* Initialize the variables. Set unknown values to -1 */
  214.  
  215.     for (j = 0; j < nstructs; j++)
  216.     {
  217.         current = vnew + j;
  218.  
  219.         init_directstruct( current, nsbufs);
  220.  
  221.         /* point to one of the ordering structs that we just malloc'ed */
  222.         current -> ordering = ostructs + j;
  223.     }
  224.  
  225.  
  226.     return vnew;
  227. }
  228. /* ======== new_chunkostruct ======== */
  229. /* Allocate and fill and a new CHUNK_ORDER struct */
  230.  
  231. CHUNK_ORDER_PTR 
  232. new_chunkostruct( nbufs )
  233. int nbufs;
  234. {
  235.     CHUNK_ORDER_PTR new;
  236.     int i;
  237.     
  238.     if ( !( new = (CHUNK_ORDER_PTR) TALLOC(1, CHUNK_ORDER, "new_chunkostruct") ) 
  239.              || !( new->bufseq = (int *) TALLOC(nbufs, int, "new_chunkostruct") ) )
  240.     {
  241.         pvmlogerror("new_chunkostruct: can't get memory\n");
  242.         pvmbailout(PvmNoMem);    
  243.         return  (CHUNK_ORDER_PTR) NULL;
  244.     }
  245.  
  246.     /* init the sequence #'s and buffer sequence #'s, nbufs */
  247.  
  248.     init_chunkostruct( new, nbufs );
  249.  
  250.     return new;
  251. }
  252.  
  253.  
  254. /* ======== new_vchunkostruct ======== */
  255. /* Allocate and fill n new CHUNK_ORDER structs , do it in a couple of mallocs
  256.     so that initializing a large number of structs is done efficiently */
  257.  
  258. CHUNK_ORDER_PTR 
  259. new_vchunkostruct( nstructs, nbufs )
  260. int nstructs;
  261. int nbufs;
  262. {
  263.     CHUNK_ORDER_PTR vnew, current;
  264.     int *bufs;
  265.     int i,j;
  266.     
  267.     if (  !( vnew = (CHUNK_ORDER_PTR) TALLOC(nstructs, CHUNK_ORDER, "new_vchunkostruct") ) 
  268.              || !( bufs = (int *) TALLOC(nbufs * nstructs, int, "new_vchunkostruct") ) )
  269.     {
  270.         pvmlogerror("new_vchunkostruct: can't get memory\n");
  271.         pvmbailout(PvmNoMem);    
  272.         return  (CHUNK_ORDER_PTR) NULL;
  273.     }
  274.  
  275.     /* init the sequence #'s and buffer sequence #'s, nbufs */
  276.  
  277.     for (j = 0; j < nstructs; j++)
  278.     {
  279.         current = vnew + j;
  280.         current -> bufseq = bufs + ( nbufs * j ); 
  281.         init_chunkostruct( current, nbufs );
  282.     }
  283.  
  284.     return vnew;
  285. }
  286.  
  287. /* ======== init_chunkostruct ======== */
  288. int
  289. init_chunkostruct( ostruct , nrbufs  )
  290. CHUNK_ORDER_PTR ostruct;
  291. int nrbufs;
  292. {
  293.     int i;
  294.  
  295.     /* Initialize the variables. Set unknown values to -1 */
  296.     ostruct -> nbufs = nrbufs;
  297.     ostruct -> seq = 0;
  298.     for (i = 0; i < nrbufs; i ++)
  299.         ostruct->bufseq[i] = i;    
  300.  
  301.     ostruct->oochunks = (CHUNK_PTR) NULL;    
  302.  
  303.     return PvmOk;
  304. }
  305.  
  306. /* ======== order_chunk  ======== */
  307. /* take a buffer number and pkt_order struct and a pkt, 
  308.     place the pkt on the oopkts list, update sequence #'s.
  309. */
  310. int
  311. order_chunk( ostruct, ibuf, inchunk )
  312. CHUNK_ORDER_PTR ostruct;
  313. int ibuf;
  314. CHUNK_PTR inchunk;  
  315. {
  316.     int *obseq;        /* order buffer sequence number */
  317.  
  318.     CHUNK_PTR firstchunk, tstchunk;    
  319.     obseq = ostruct->bufseq + ibuf;
  320.  
  321.     inchunk->CHUNK_SEQ = *obseq; 
  322.  
  323.  
  324.     /* store next packet sequence # to arrive in this buffer 
  325.         from this sender*/
  326.     *obseq = NEXTCHUNKSEQ(*obseq, ostruct->nbufs);
  327.  
  328.     if (ostruct -> oochunks == (CHUNK_PTR) NULL)
  329.     {
  330.         ostruct -> oochunks = inchunk;
  331.         inchunk -> CHUNK_NEXT = inchunk -> CHUNK_PREV = inchunk;
  332.     }
  333.     else
  334.     {
  335.         firstchunk = tstchunk = ostruct->oochunks;
  336.         while (SEQLESSTHAN(tstchunk->CHUNK_SEQ, inchunk -> CHUNK_SEQ)
  337.                 && tstchunk -> CHUNK_NEXT != firstchunk)
  338.             tstchunk = tstchunk -> CHUNK_NEXT;
  339.  
  340.         if (SEQLESSTHAN(tstchunk ->CHUNK_SEQ, inchunk->CHUNK_SEQ))
  341.         {
  342.             LISTPUTAFTER(tstchunk, inchunk, CHUNK_NEXT, CHUNK_PREV);
  343.         }
  344.         else
  345.         {
  346.             LISTPUTBEFORE(tstchunk, inchunk, CHUNK_NEXT, CHUNK_PREV);
  347.             if (tstchunk == firstchunk) /* new first of the list ? */
  348.                 ostruct -> oochunks = inchunk;
  349.         }
  350.     }
  351.  
  352.     if (pvmdebmask & (PDMPACKET | PDMNODE))
  353.     {
  354.         sprintf(mpperrtext,"Ordering packet seq %d from tid %x\n", 
  355.                 inchunk -> CHUNK_SEQ, inchunk -> CHUNK_SRC); 
  356.         pvmlogerror(mpperrtext);
  357.     }
  358.     return 0;
  359. }    
  360.  
  361. /* ======== ochunk_delete  ======== */
  362. /* delete a pkt from the oopkts struct if any. Adjust the
  363.     seq number.
  364.     returns NULL if no packet is ready.
  365. */
  366. CHUNK_PTR
  367. ochunk_delete( ostruct )
  368. CHUNK_ORDER_PTR ostruct;
  369. {
  370.     CHUNK_PTR tstchunk;
  371.  
  372.     if ( !ostruct || ! (ostruct->oochunks))
  373.         return (CHUNK_PTR) NULL;
  374.  
  375.     tstchunk = ostruct->oochunks;
  376.  
  377.     if (tstchunk -> CHUNK_SEQ == ostruct -> seq)  /* yep, we got one */
  378.     {
  379.         ostruct->seq = NEXTCHUNKSEQ(ostruct -> seq, 1);
  380.         if (tstchunk -> CHUNK_NEXT == tstchunk)
  381.             ostruct -> oochunks = (CHUNK_PTR) NULL;
  382.         else
  383.         {
  384.             ostruct -> oochunks = tstchunk -> CHUNK_NEXT;
  385.             LISTDELETE(tstchunk, CHUNK_NEXT, CHUNK_PREV);
  386.         }
  387.  
  388.         if (pvmdebmask & (PDMPACKET | PDMNODE))
  389.         {
  390.             sprintf(mpperrtext,"ochunk_delete() packet seq %d from tid %x\n", 
  391.                     tstchunk -> CHUNK_SEQ, tstchunk -> CHUNK_SRC); 
  392.             pvmlogerror(mpperrtext);
  393.         }
  394.  
  395.         tstchunk -> CHUNK_NEXT = tstchunk -> CHUNK_PREV = (CHUNK_PTR) NULL;
  396.         tstchunk -> CHUNK_SEQ = 0;
  397.         
  398.         return tstchunk;
  399.     }    
  400.  
  401.     return (CHUNK_PTR) NULL;
  402. }
  403. /* ======== init_recv_list  ======== */
  404. /* allocate a message receive array and post the actual receives */
  405.  
  406. MSG_INFO_PTR
  407. init_recv_list( nbufs, tagbase, msize, hsize, appid, llmsg )
  408. int nbufs, tagbase, msize, hsize;
  409. int appid;
  410. MSGFUNC_PTR llmsg;
  411. {
  412.     MSG_INFO_PTR new, tmp;
  413.     CHUNK_PTR msgchunk;
  414.     char * datap;
  415.     int i;    
  416.     int cc;
  417.     int mrsize; /* message receive size */
  418.  
  419.     if (  !( new = (MSG_INFO_PTR) TALLOC(nbufs, MSG_INFO, "init_recv_list") ) ) 
  420.     {
  421.         pvmlogerror("init_recv_list: couldn't allocate memory \n");
  422.         return (MSG_INFO_PTR) NULL;
  423.     }
  424.  
  425.     tmp = new;
  426.     
  427.     for (i = 0; i < nbufs; i ++)
  428.     {
  429.         cc = post_receive(tmp, -1, tagbase + i, msize, hsize, MPPANY, appid,
  430.             llmsg);
  431.         tmp ++;
  432.         if (cc < 0)
  433.         {    
  434.             sprintf(mpperrtext,"init_recv_list(): post_receive failed! src %d tag %d msize %d hsize %d pid %d appid %d : nbufs %d buffer %d \n", 
  435.             -1, tagbase + i, msize, hsize, MPPANY, appid, nbufs, i);
  436.         
  437.             pvmlogerror(mpperrtext);
  438.         }
  439.     }
  440.  
  441.     return new;
  442. }
  443.  
  444. /* ======= post_receive ======== */    
  445. /* allocs a new packet and posts an asynchronous receive for the message */ 
  446. int
  447. post_receive(imsg, src, tag, mxsize, headroom, ptype, appid, llmsg)
  448. MSG_INFO_PTR imsg;
  449. int src, tag, mxsize, headroom;
  450. int ptype;
  451. int appid;
  452. MSGFUNC_PTR llmsg;
  453. {
  454.     int cc;
  455.  
  456.     char *datap;
  457.     char errtxt[128];
  458.  
  459.     CHUNK_PTR msgchunk;
  460.     
  461.      if ( !(msgchunk = CHUNK_NEW(mxsize)) )
  462.     {
  463.         pvmlogerror("post_recv: couldn't allocate memory \n");
  464.         return PvmNoMem;
  465.     }    
  466.  
  467.     datap =  msgchunk->CHUNK_DAT + headroom; 
  468.  
  469.     imsg -> rchunk = msgchunk;        /* PVM packet */
  470.     imsg -> src = src; 
  471.     imsg -> tag = tag;    /* tag for this buffer */
  472.     imsg -> mxsize = mxsize;    /* max msg size for this buffer */ 
  473.     imsg -> hsize =  headroom;     /* header size (data offset) */
  474.     
  475.     cc = (*llmsg->imsgrecv)(appid, src, tag, datap, mxsize - headroom, 
  476.             ptype, imsg->info, &(imsg->mid));
  477.  
  478.     if ( cc < 0)
  479.     {
  480.         
  481.         sprintf(errtxt, "post_recv: bad msg id (%d). Fatal! \n", cc);
  482.         pvmlogerror(errtxt);
  483.         return PvmSysErr;
  484.     }    
  485. }        
  486.  
  487. /* ======= post_send ======== */    
  488. /* Given a packet,length, and a MPP_DIRECTI structure, send a message
  489. to the recipient. Info used from the MPP_DIRECTI struct is
  490.     rtid;
  491.     tagbase;
  492.     nbufs;
  493.     sseq;  
  494.  
  495. Returns:
  496.     a positive mid if the send was successful.
  497.     a negative mid if the send was unsuccessful.
  498.  
  499.     sequence #s in the MPP_DIRECTI struct are only advanced if 
  500.     the send was successful.
  501. */ 
  502. msgmid_t
  503. post_send(buffer, len, distruct, llmsg)
  504. char *buffer;
  505. int len;
  506. MPP_DIRECTI_PTR distruct;
  507. MSGFUNC_PTR llmsg;
  508. {
  509.     int appid;
  510.     int cc;
  511.     int dest;
  512.     int ptype;
  513.     int rtid;
  514.     int tag; 
  515.  
  516.     msgmid_t mid;
  517.  
  518.     if (distruct == (MPP_DIRECTI_PTR) NULL)
  519.         return (-1);
  520.  
  521.     rtid = distruct->rtid;
  522.  
  523.     tag = distruct -> sseq % distruct -> tagbase;
  524.  
  525.     dest = (distruct -> rtid) & TIDNODE;
  526.  
  527.     ptype = (distruct -> rpid); 
  528.     
  529.     appid = (distruct -> appid);
  530.  
  531.     /* Increment the send sequence number if the send is successful */
  532.     if ( (cc =  (*llmsg->imsgsend)(appid, tag, buffer, len, dest, ptype, &mid)) >= 0 )
  533.         distruct -> sseq = INCRSEQ( distruct->sseq, MAXSEQ); 
  534.  
  535.     return mid;
  536. }    
  537.     
  538. /* ======= read_chunk ======== */    
  539. /*  reads the chunk indicated by the msg_info structure and 
  540.     then re-posts a receive to takes its place. Will not block
  541.     if the message has completed, so can be used as a "probe with
  542.     receive"
  543.  
  544.     if repost > 0; then repost a receive with the same parameters;
  545. */
  546. CHUNK_PTR
  547. read_chunk( imsg, src, tag, len , repost, appid, llmsg)
  548. MSG_INFO_PTR imsg;
  549. int *src;
  550. int *tag;
  551. int *len;
  552. int repost;
  553. int appid;
  554. MSGFUNC_PTR llmsg;
  555. {
  556.     CHUNK_PTR rchunk= (CHUNK_PTR) NULL;
  557.  
  558.     if (imsg && imsg->rchunk && imsg->mid >= 0 
  559.             && (*llmsg->msgdone)(appid, &(imsg->mid), imsg->info))
  560.  
  561.     {
  562.         *src = (*llmsg->msgsrc)(imsg->info);
  563.         *len = (*llmsg->msglen)(imsg->info);
  564.         *tag = (*llmsg->msgtag)(imsg->info);
  565.         rchunk = imsg -> rchunk;
  566.  
  567.         if (pvmdebmask & PDMPACKET)
  568.         {
  569.             sprintf(mpperrtext,"read-chunk(): node %d, len %d, tag %d \n", 
  570.                     *src, *len, *tag);
  571.             pvmlogerror(mpperrtext);
  572.         }
  573.  
  574.         if (repost)
  575.             post_receive(imsg, imsg->src, imsg->tag, imsg->mxsize, imsg->hsize,                     MPPANY, appid, llmsg);
  576.     }
  577.         
  578.     return rchunk;        
  579. }
  580.  
  581.  
  582. /* -------- pvm_chunkReady() --------- */
  583. /* probe to see if any chunks have arrived. Chunks are in pre-posted
  584.  * buffers numbered 0, 1, ..., (nbufs-1). 
  585.  * pvm_chunkReady() will return a chunk if it is completed.
  586.  
  587.  * chunkReady handles inplace reception of messages by examining the
  588.  * header structure of the chunk. It allocates more memory if it needs
  589.  *
  590.  * Input:
  591.  *    ppmsg - vector of pre-posted receive MSG_INFO pointers (length == nbufs)
  592.  *  nbufs - length of ppmsg array
  593.  *  mfunc - low level message passing primitive functions to use
  594.  *  ostructs - ordering structures 
  595.  *  nstructs - number of ordering structures
  596.  *  *cbuf - current buffer to probe in the ppmsg array. Updated by pvm_frReady
  597.  *  **hdReadyList - list of ready chunks, updated by pvm_chunkReady
  598.  *
  599.  * Return:
  600.  *  rcp - pointer to ready chunk. NULL if none ready
  601. */
  602. CHUNK_PTR
  603. pvm_chunkReady(ppmsg, nbufs, mfunc, ostructs, nstructs, cbuf, hdReadyList )
  604. MSG_INFO_PTR ppmsg;    /* vector of pre-posted message buffers */
  605. int nbufs;                /* number of bufs in ppsmg */
  606. MSGFUNC_PTR mfunc;        /* structure of message passing primitives */
  607. MPP_DIRECTI_PTR    ostructs;
  608. int nstructs;
  609. int *cbuf;                /* buffer to probe in ppmsg vector */
  610. CHUNK_PTR *hdReadyList;
  611.  
  612. {
  613.     char *cp;
  614.     char errtxt[64];
  615.  
  616.     MPP_DIRECTI_PTR tcon;
  617.  
  618.     int cc;
  619.     int i, ipctxt, ipff, iplen, ipsrc, iptag;
  620.     int flen;
  621.     int len;
  622.     int match;
  623.     int src;
  624.     int tag, tpff, tpsrc; 
  625.     
  626.     CHUNK *rcp, *rcp2; 
  627.  
  628.     struct msgid *precvMsg, *precvIds;
  629.  
  630.     if (*hdReadyList)
  631.         if ((*hdReadyList)->CHUNK_NEXT != *hdReadyList)
  632.         {
  633.              rcp = (*hdReadyList)->CHUNK_NEXT;
  634.             LISTDELETE(rcp, CHUNK_NEXT, CHUNK_PREV);
  635.             return rcp;
  636.         }
  637.  
  638.     /*  check if there is a ready chunk in the preposted buffers.  
  639.         repost the receive for the next time around */ 
  640.  
  641.     for (i = 0; i < nbufs; i ++)
  642.     {
  643.         if ( *cbuf >= nbufs || *cbuf < 0) 
  644.             *cbuf = 0;
  645.  
  646.         if (rcp = read_chunk(ppmsg + (*cbuf), &src, &tag, &len, MPPREPOST, 
  647.                 MPPANY, mfunc))     
  648.         {
  649.             rcp->CHUNK_LEN = len;     /* remember how much data was sent */
  650.  
  651.             rcp->CHUNK_SRC = src;     /* this will be a node #, not a tid ! */
  652.  
  653.             if (pvmdebmask & (PDMNODE | PDMPACKET))
  654.             {
  655.                 tpff = pvmget8(rcp->CHUNK_DAT + 12);
  656.  
  657.                 tpsrc = pvmget32(rcp->CHUNK_DAT + 4);
  658.  
  659.                 sprintf(errtxt, "readyChunk()read_chunk src %x node %d len %d ff %d buffer %d\n",
  660.                         tpsrc, src, len, tpff, *cbuf);
  661.                 pvmlogerror(errtxt);
  662.  
  663.             }
  664.  
  665.  
  666.             /* find the ordering structure for this task, put this chunk
  667.                 on this tasks reorder queue */
  668.  
  669.             if ( !(tcon =  pvm_find_direct(ostructs, nstructs, src)))
  670.             {
  671.                 pvmlogerror("frReady() could not find ordering struct \n");    
  672.                 return (CHUNK_PTR) NULL;
  673.             }
  674.             else
  675.                 order_chunk(tcon->ordering, *cbuf, rcp); /* put on order q */
  676.         
  677.             /*  remove in-order chunks on for this task, put them in 
  678.                 callers readyFrags queue */
  679.  
  680.             while ( rcp = ochunk_delete(tcon->ordering)) /* de-queue chunks */
  681.             {
  682.                 if (!*hdReadyList)   /* readyList is empty, initialize */
  683.                     *hdReadyList = CHUNK_NEW(0);
  684.  
  685.  
  686.                 if (pvmdebmask & (PDMNODE | PDMPACKET))
  687.                     pvmlogprintf("dequeing chunk from %x len %d\n", rcp->CHUNK_SRC, rcp->CHUNK_LEN);    
  688.                 /* XXX - This next part is really ugly! It checks if this
  689.                  *    chunk is an inplace header and if there is a 
  690.                  *    precv outstanding for it. If there is, then the message
  691.                   *    can go directly into the user memory. 
  692.                  *
  693.                  * for daemons, it is sufficient to set precvIds to NULL
  694.                  * This has to be done *AFTER* the frag header has been
  695.                  * dequeued from the readyList to preserve ordering
  696.                 */
  697.     
  698.                 if ( (flen = pvmget32(rcp->CHUNK_DAT + 8 ) + TDFRAGHDR)  
  699.                         != rcp->CHUNK_LEN )
  700.                 {
  701.                     ipff = pvmget8(rcp->CHUNK_DAT + 12);
  702.     
  703.                     match = 0;
  704.     
  705.                     if (ipff  & FFSOM ) /* start of message, check for precv*/ 
  706.                     {
  707.                         ipsrc = pvmget32(rcp->CHUNK_DAT + 4);
  708.     
  709.                         iptag = pvmget32(rcp->CHUNK_DAT + TDFRAGHDR + 4);
  710.     
  711.                         ipctxt = pvmget32(rcp->CHUNK_DAT + TDFRAGHDR + 8);
  712.     
  713.                          if     (precvMsg = pvm_mpp_get_precvids()) 
  714.                         {
  715.                             if ( match = (((precvMsg->otid == -1 || ipsrc == precvMsg->otid)
  716.                                     && (precvMsg->tag  == -1 || iptag == precvMsg->tag)
  717.                                     && (ipctxt == precvMsg->ctxt)) ? 1 : 0) )
  718.                             {
  719.                                 cc = inplaceRecv( src, src, precvMsg->ubuf, 
  720.                                     precvMsg->len, &iplen, mfunc);    
  721.                                 precvMsg->len = iplen;
  722.                                 precvMsg->complete = 1; /* got it ! */
  723.                                 rcp->CHUNK_RIP = 1;     /* received in place */
  724.                             }
  725.                         }
  726.                     }
  727.     
  728.                     if (!match)
  729.                     {
  730.                         if ( flen > rcp->CHUNK_MAX ) /* won't fit */
  731.                         {
  732.                             rcp2 = CHUNK_NEW(flen);
  733.     
  734.                             BCOPY(rcp->CHUNK_DAT, rcp2->CHUNK_DAT, rcp->CHUNK_LEN);
  735.     
  736.                             rcp2->CHUNK_SRC = rcp->CHUNK_SRC;
  737.     
  738.                             rcp2->CHUNK_LEN = rcp->CHUNK_LEN;
  739.     
  740.                             CHUNK_FREE(rcp);
  741.     
  742.                             rcp = rcp2;
  743.             
  744.                         }
  745.     
  746.                         cc = inplaceRecv(src, src, rcp->CHUNK_DAT + rcp->CHUNK_LEN,
  747.                             rcp->CHUNK_MAX - rcp->CHUNK_LEN, &iplen, mfunc);
  748.     
  749.                         rcp->CHUNK_LEN += iplen;
  750.     
  751.                     }
  752.                          
  753.                 }
  754.  
  755.                 LISTPUTBEFORE(*hdReadyList, rcp, CHUNK_NEXT, CHUNK_PREV);
  756.             }
  757.  
  758.             (*cbuf) ++;        /* increment to probe the next buffer */
  759.             break;
  760.         }
  761.  
  762.         (*cbuf) ++;        /* increment to probe the next buffer */
  763.         
  764.     }
  765.  
  766.  
  767.     /* if ordered frags have been put into the global ready list, then
  768.         dequeue one of them */
  769.  
  770.     if (*hdReadyList && (*hdReadyList)->CHUNK_NEXT != *hdReadyList)
  771.     {
  772.          rcp = (*hdReadyList)->CHUNK_NEXT;
  773.         LISTDELETE(rcp, CHUNK_NEXT, CHUNK_PREV);        
  774.         if (pvmdebmask & (PDMNODE | PDMPACKET))
  775.             pvmlogprintf(" chunk  from hdready from %x len %d\n", rcp->CHUNK_SRC, rcp->CHUNK_LEN);    
  776.         return rcp;
  777.     }
  778.     else
  779.         return (CHUNK_PTR) NULL;
  780.  
  781. }
  782.  
  783. /* -------  inplaceRecv() ------- */
  784. int
  785. inplaceRecv(src, tag, buf, len, rlen, llmsg)
  786. int src;
  787. int tag;
  788. char *buf;
  789. int len;
  790. int *rlen;
  791. MSGFUNC_PTR llmsg;
  792. {
  793.      MSG_INFO imsg;        
  794.  
  795.     int appid = 0;        /* ignored by tasks */
  796.     int cc;
  797.     int ptype = 0;        /* ouch! XXX. Fix this ! */
  798.  
  799.     char logtxt[64];
  800.  
  801.      cc = (*llmsg->imsgrecv)(0, src, tag, buf,len, ptype, imsg.info, &(imsg.mid));
  802.  
  803.     if (cc < 0)
  804.     {
  805.         sprintf(logtxt," inplaceRecv(): bad mid (%d)\n", cc);
  806.         pvmlogerror(logtxt);
  807.         return PvmSysErr;
  808.     }
  809.  
  810.     while ( (cc = (*llmsg->msgdone)(appid, &(imsg.mid), imsg.info)) == 0 )
  811.         /* wait for recv to finish */ ; 
  812.  
  813.     if (cc < 0)
  814.      {
  815.         sprintf(logtxt,"inplaceRecv(): error on msgdone (%d)\n", cc);
  816.         pvmlogerror(logtxt);
  817.         return PvmSysErr;
  818.     }
  819.  
  820.     *rlen = (*llmsg->msglen)(imsg.info);
  821.  
  822.     return 0;
  823.  
  824. }
  825.  
  826.  
  827. /* ===========  Look for a send mid that has finished  ========= */
  828. /* This routine checks to see if there is a free slot to describe
  829.    a chunk (pkt or frag) that has been asychronously sent.  If
  830.    the available slots (num) are taken, it checks to see if
  831.    the message has completed. If it has, then it frees the chunk
  832.   
  833.    returns 
  834.         >= 0        successful
  835.         < 0            no free descriptors left, none of the previous ones
  836.                     have completed.   
  837. */
  838. int
  839. pvm_mpp_find_midx(mppsendmids, mppoutchunks, current, num, mfunc )
  840. msgmid_t *mppsendmids;
  841. CHUNK_PTR *mppoutchunks;
  842. int *current;
  843. int num;
  844. MSGFUNC_PTR mfunc;
  845. {
  846. int foundidx;
  847. int i, idx;
  848. int appid = 0;            /** XXXX fix this for PGONPUMA */
  849. info_t minfo[MPPINFOSIZE];
  850.  
  851.     foundidx = 0;
  852.  
  853.     idx = *current;
  854.     
  855.  
  856.     for (i = 0; i < num; i++ )
  857.     {
  858.  
  859.         /* Check if a previous send has finished */
  860.         if (mppsendmids[idx] >= 0)
  861.         {
  862.             if ((*mfunc->msgdone)(appid, &(mppsendmids[idx]), minfo) )
  863.             {
  864.                 mppsendmids[idx] = (msgmid_t) -1; 
  865.                 if (mppoutchunks[idx])
  866.                 {
  867.                     CHUNK_FREE(mppoutchunks[idx]);        
  868.                     mppoutchunks[idx] = (CHUNK_PTR) NULL;
  869.                 }
  870.                 foundidx = 1;
  871.             }
  872.         }
  873.         else 
  874.         {
  875.             if (mppsendmids[idx] == (msgmid_t) MPPMIDFREE)
  876.                 foundidx = 1;
  877.         }
  878.         
  879.         if (foundidx)
  880.             break;
  881.  
  882.         if (++idx >= num)
  883.              idx = 0;
  884.     }
  885.  
  886.     if (foundidx)
  887.     {
  888.         mppsendmids[idx] = (msgmid_t) MPPMIDALLOCED;
  889.  
  890.         if ( (*current = idx + 1) >= num)
  891.             *current = 0;
  892.  
  893.         return (idx);
  894.     }
  895.     else
  896.         return (-1);
  897. }
  898.  
  899. /* ============ pvm_assign_mid =========== */
  900. /* set the mid in the mppsendmids array */
  901. int
  902. pvm_assign_mid(mppsendmids, mid, idx)
  903. msgmid_t *mppsendmids;
  904. msgmid_t mid;
  905. int idx;
  906. {
  907.     mppsendmids[idx] = mid;
  908.     return 0;
  909. };
  910.  
  911.  
  912. /* ============ pvm_assign_chunk =========== */
  913. /* set the chunk pointer in the in the mppoutchunks array */
  914. int
  915. pvm_assign_chunk(mppoutchunks, icp, idx)
  916. CHUNK_PTR *mppoutchunks;
  917. CHUNK_PTR icp;
  918. int idx;
  919. {
  920.     mppoutchunks[idx] = icp;
  921.     return 0;
  922. }
  923.  
  924. /* ========== pvm_init_asynch_list ========== */
  925. int 
  926. pvm_init_asynch_list(outmids, outchunks, num)
  927. msgmid_t *outmids;
  928. CHUNK_PTR *outchunks;
  929. int num;
  930. {
  931. int i;
  932.     for (i = 0 ; i < num ; i ++ )
  933.     {
  934.         outmids[i] = (msgmid_t) MPPMIDFREE;
  935.         outchunks[i] = (CHUNK_PTR) NULL;
  936.     }
  937.     return 0;
  938. }
  939.